package onion.mqtt.client;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.mqtt.*;
import org.apache.commons.lang3.ObjectUtils;

/**
 * @author Mr, Lu
 * @developmentTeam 浙江允泽信息科技有限公司
 * @createTime 2023/12/14
 */
@ChannelHandler.Sharable
public class MqttClientInboundHandler extends SimpleChannelInboundHandler<MqttMessage> {

    private final MqttClientProcessor processor;

    public MqttClientInboundHandler() {
        this.processor = new MqttClientProcessor();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MqttMessage message) throws Exception {
        if (ObjectUtils.isEmpty(message)) {
            return;
        }
        if (message.decoderResult().isFailure()) {

        }
        MqttFixedHeader mqttFixedHeader = message.fixedHeader();
        switch (mqttFixedHeader.messageType()) {
            case CONNACK:
                processor.processConnAck(ctx.channel(), (MqttConnAckMessage) message);
                break;
            case SUBACK:
                processor.processSubAck(ctx.channel(), (MqttSubAckMessage) message);
                break;
            case UNSUBACK:
                processor.processUnsubAck(ctx.channel(), message);
                break;
            case PUBLISH:
                processor.processPublish(ctx.channel(), (MqttPublishMessage) message);
                break;
            case PUBACK:
                processor.processPubAck(ctx.channel(), message);
                break;
            case PUBREC:
                processor.processPubRec(ctx.channel(), message);
                break;
            case PUBREL:
                processor.processPubRel(ctx.channel(), message);
                break;
            case PUBCOMP:
                processor.processPubComp(ctx.channel(), message);
                break;
            default:
                break;
        }
    }
}
